package drds.plus.repository.mysql.spi;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import drds.plus.common.lifecycle.AbstractLifecycle;
import drds.plus.common.model.DataNode;
import drds.plus.datanode.api.DatasourceManager;
import drds.plus.executor.ExecuteContext;
import drds.plus.executor.command_handler.CommandHandlerFactory;
import drds.plus.executor.cursor.cursor_factory.CursorFactory;
import drds.plus.executor.cursor.cursor_factory.CursorFactoryImpl;
import drds.plus.executor.data_node_executor.spi.DataNodeExecutor;
import drds.plus.executor.repository.IDatasourceManagerGetter;
import drds.plus.executor.repository.Repository;
import drds.plus.executor.repository.RepositoryConfig;
import drds.plus.executor.table.ITable;
import drds.plus.executor.table.ITemporaryTable;
import drds.plus.executor.transaction.Transaction;
import drds.plus.repository.mysql.handler.CommandHandlerFactoryImpl;
import drds.plus.sql_process.abstract_syntax_tree.configuration.TableMetaData;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.ExecutePlan;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;

import java.util.concurrent.ExecutionException;

@Slf4j
public class RepositoryImpl extends AbstractLifecycle implements Repository {
    public Logger log() {
        return log;
    }

    protected LoadingCache<String, LoadingCache<TableMetaData, ITable>> dataNodeIdToTableMetaDataToTableCacheCache;
    protected LoadingCache<DataNode, DataNodeExecutor> dataNodeToDataNodeExecutorLoadingCache;
    protected RepositoryConfig repositoryConfig;
    protected CursorFactoryImpl cursorFactoryMy;
    protected CommandHandlerFactory commandHandlerFactory = null;
    protected IDatasourceManagerGetter datasourceManagerGetter = new DatasourceManagerGetterImpl();


    public void doInit() {
        this.repositoryConfig = new RepositoryConfig();
        this.repositoryConfig.setProperty(RepositoryConfig.DEFAULT_TXN_ISOLATION, "READ_COMMITTED");
        this.repositoryConfig.setProperty(RepositoryConfig.IS_TRANSACTIONAL, "true");
        cursorFactoryMy = new CursorFactoryImpl();
        commandHandlerFactory = new CommandHandlerFactoryImpl();
        dataNodeIdToTableMetaDataToTableCacheCache = CacheBuilder.newBuilder().build(new CacheLoader<String, LoadingCache<TableMetaData, ITable>>() {

            public LoadingCache<TableMetaData, ITable> load(final String dataNodeId) throws Exception {
                return CacheBuilder.newBuilder().build(new CacheLoader<TableMetaData, ITable>() {

                    public ITable load(TableMetaData tableMetaData) throws Exception {
                        try {
                            DatasourceManager datasourceManager = datasourceManagerGetter.getDatasourceManager(dataNodeId);
                            Table table = new Table(dataNodeId, datasourceManager, tableMetaData);
                            table.setSelect(false);
                            return table;
                        } catch (Exception ex) {
                            throw new RuntimeException(ex);
                        }
                    }

                });
            }
        });

        dataNodeToDataNodeExecutorLoadingCache = CacheBuilder.newBuilder().build(new CacheLoader<DataNode, DataNodeExecutor>() {

            public DataNodeExecutor load(DataNode dataNode) throws Exception {
                DatasourceManager datasourceManager = new DatasourceManager(dataNode.getApplicationId(), dataNode.getId());
                datasourceManager.init();
                //
                drds.plus.repository.mysql.executor.DataNodeExecutor dataNodeExecutor = new drds.plus.repository.mysql.executor.DataNodeExecutor(getRepository());
                dataNodeExecutor.setDataNode(dataNode);
                dataNodeExecutor.setDatasourceManager(datasourceManager);
                dataNodeExecutor.init();
                return dataNodeExecutor;
            }
        });
    }

    protected Repository getRepository() {
        return this;
    }

    protected void doDestroy() throws RuntimeException {
        dataNodeIdToTableMetaDataToTableCacheCache.cleanUp();

        for (DataNodeExecutor dataNodeExecutor : dataNodeToDataNodeExecutorLoadingCache.asMap().values()) {
            dataNodeExecutor.destroy();
        }
    }

    public ITable getTable(final String dataNodeId, final TableMetaData tableMetaData) throws RuntimeException {
        if (tableMetaData.isTemporaryTable()) {
            return getTemporaryTable(tableMetaData);
        } else {
            try {
                return dataNodeIdToTableMetaDataToTableCacheCache.get(dataNodeId).get(tableMetaData);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public ITemporaryTable getTemporaryTable(TableMetaData tableMetaData) throws RuntimeException {
        throw new UnsupportedOperationException("temp where is not supported by mysql repository");
    }

    public RepositoryConfig getRepositoryConfig() {
        return repositoryConfig;
    }


    public CursorFactory getCursorFactory() {
        return cursorFactoryMy;
    }

    public CommandHandlerFactory getCommandExecutorFactory() {
        return commandHandlerFactory;
    }


    public DataNodeExecutor getDataNodeExecutor(final DataNode dataNode) {
        try {
            return dataNodeToDataNodeExecutorLoadingCache.get(dataNode);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }

    }

    public JdbcHandler getJdbcHandler(ExecuteContext executeContext, ExecutePlan executePlan, IDatasourceManagerGetter datasourceManagerGetter) {

        JdbcHandler jdbcHandler = new JdbcHandler(executeContext);

        DatasourceManager datasourceManager = datasourceManagerGetter.getDatasourceManager(executePlan.getDataNodeId());
        Transaction transaction = executeContext.getTransaction();
        if (transaction == null) {
            throw new IllegalAccessError("impossible, txn is null");
        }
        jdbcHandler.setDatasourceManager(datasourceManager);
        jdbcHandler.setDataNodeId(executePlan.getDataNodeId());
        jdbcHandler.setTransaction(transaction);
        jdbcHandler.setExecutePlan(executePlan);
        return jdbcHandler;
    }


}
